from google.protobuf.json_format import MessageToDict
from kafka import KafkaConsumer


class ConsumerClass:

    def __init__(self, ip_address: str, topic: str):
        self.ip_address = ip_address
        self.topic = topic
        self.list_messages = []

    def read_msg_consumer(self):
        kafka_consumer = KafkaConsumer(
            topics=self.topic,
            api_version=(0, 10),
            consumer_timeout_ms=1000,
            enable_auto_commit=True,
            auto_offset_reset="earliest"
        )
        kafka_consumer.poll()

        for msg in kafka_consumer:
            key = msg.key
            data_msg = msg.value

            pattern_msg = {"key": key, "value": data_msg}
            self.list_messages.append(pattern_msg)

        if kafka_consumer is not None:
            kafka_consumer.close()

    def read_msg_consumer_proto(self, pattern_proto_msg):
        kafka_consumer = KafkaConsumer(
            topics=self.topic,
            api_version=(0, 10),
            consumer_timeout_ms=1000,
            enable_auto_commit=True,
            auto_offset_reset="earliest"
        )
        kafka_consumer.poll()

        for msg in kafka_consumer:
            key_for_decode = msg.key
            data_msg = msg.value
            dict_from_msg = None
            if type(key_for_decode) is bytes:
                key_for_decode = key_for_decode.decode()
            if type(data_msg) is bytes:
                data_msg_proto = pattern_proto_msg
                data_msg_proto.ParseFromString(data_msg)
                dict_from_msg = MessageToDict(data_msg_proto)
            pattern_msg = {"key": key_for_decode, "value": dict_from_msg}
            self.list_messages.append(pattern_msg)

        if kafka_consumer is not None:
            kafka_consumer.close()
